perf: Implement physical execution of uncorrelated scalar subqueries#21240
perf: Implement physical execution of uncorrelated scalar subqueries#21240neilconway wants to merge 34 commits intoapache:mainfrom
Conversation
| pub struct DefaultPhysicalProtoConverter; | ||
| #[derive(Default)] | ||
| pub struct DefaultPhysicalProtoConverter { | ||
| scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>, |
There was a problem hiding this comment.
I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.
There was a problem hiding this comment.
This feels like a bit of an anti-pattern. I'm going to need a bit of time to dive into what's going on here, but hopefully will get to it either this afternoon or maybe Sunday evening.
There was a problem hiding this comment.
I put up this PR targeting you branch as an explanation of what I mean.
The problem I have with adding state data to DefaultPhysicalProtoConverter is that now any time we have a custom proto converter that doesn't call the default, we will not be able to process these scalar subquery results.
Instead I think we just have to plumb this data member through the deserialization process. I haven't taken a super deep look into exactly how this ends up getting used to see if there's another way to take advantage. The method I used in the PR was basically to add a struct that contains all of the parts we pass through deserialization and add the scalar_subquery_results to it.
In regards to switching from FunctionRegistry -> TaskContext that's a great change. It was done part way in recent releases for the physical side but not on the logical side. It makes perfect sense to do it the way you have on the logical side.
| /// TODO: Consider overlapping computation of the subqueries with evaluating the | ||
| /// main query. | ||
| /// | ||
| /// TODO: Subqueries are evaluated sequentially. Consider parallel evaluation in | ||
| /// the future. |
There was a problem hiding this comment.
Happy to address these TODOs now or in a followup PR, if folks have opinions on the best way to do this.
There was a problem hiding this comment.
I implemented parallel evaluation but I haven't done overlapping evaluate of subqueries with the main query yet.
| // Create the shared results container and register it (along with | ||
| // the index map) in ExecutionProps so that `create_physical_expr` | ||
| // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr` | ||
| // nodes. We clone the SessionState so these are available | ||
| // throughout physical planning without mutating the caller's state. | ||
| // | ||
| // Ideally, the subquery state would live in a dedicated planning | ||
| // context rather than on ExecutionProps (which is meant for | ||
| // session-level configuration). It's here because | ||
| // `create_physical_expr` only receives `&ExecutionProps`, and | ||
| // changing that signature would be a breaking public API change. | ||
| let results: Arc<Vec<OnceLock<ScalarValue>>> = | ||
| Arc::new((0..links.len()).map(|_| OnceLock::new()).collect()); | ||
| let session_state = if links.is_empty() { | ||
| Cow::Borrowed(session_state) | ||
| } else { | ||
| let mut owned = session_state.clone(); | ||
| owned.execution_props_mut().subquery_indexes = index_map; | ||
| owned.execution_props_mut().subquery_results = Arc::clone(&results); | ||
| Cow::Owned(owned) | ||
| }; |
There was a problem hiding this comment.
This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Is that true in practice? e.g.,
Both plan nodes are used by TPC-DS Q24, as one example of a place where we saw a slowdown w/o adding additional overlapping in ScalarSubqueryExec. |
You are probably right in those cases it is doing potentially many things in parallel. But I think this is not what we want ideally - we want to run few independent pipelines as possible, and get (data) parallelism from the individual pipelines rather than executing all at the same time. |
I don't disagree 😊 But for the purposes of this PR, we will regress performance on some benchmark queries if we don't do some additional work to get the same degree of overlapping that the cross-join path gets today. Is that something we're okay with? I don't think the additional complexity to overlap subquery evaluation with main query evaluation is too bad (via |
Yeah I think that's okay, as far as we don't regress on memory usage too much I think we should be ok! @alamb we should consider (reducing) parallelism/implicit buffering from CoalescePartitionsExec / SortPreservingMergeExec once we land morsel-driven scanning. |
|
Seems that mainly CoalescePartitions is helpful for TPC-DS SF=1 (and slightly for TPCH). That benchmark has very limited parallelism (because of single-rowgroup tables). SortPreservingMergeExec doesn't seem to do much so far as I can see (which I think makes sense as it will be mostly used in the root. |
|
@Dandandan Based on discussion, I won't plan to implement the work to overlap main query and subquery evaluation for this PR. What do you think makes sense as a next step? We could wait to merge this PR until the morsel-driven parallelism work lands (so we can check that morsel-driven parallelism effectively recovers the parallelism that we'll lose from the simple approach in this PR), or land them separately and just make sure we verify that overall performance hasn't regressed before we ship 54. wdyt? |
|
Any chance we can break this PR into smaller pieces (e.g. move benchmarks, for example) to make it easier to review? |
Hmmm, that might be a bit tricky. The benchmarks are pretty trivial and could easily be omitted. Here's how Claude summarizes the PR:
If it is helpful, I could prepare two PRs that have a split like:
If you think that would be easier to review, lmk. |
|
Ok, wil try and review shortly |
|
I started reviewing this PR and will hopefully complete the review shortly |
Thanks @alamb ! Feel free to ping me if you have any questions or want to discuss. |
alamb
left a comment
There was a problem hiding this comment.
I went through this PR carefully and overall I think it looks great. Thank you so much @neilconway -- the implementation makes sense and I think it moves the needle forward for subquery execution
Things I would like to see before I approve this PR:
- Why is the large file size change required
- Fix
reset_state(see inlined comment) as I think that would be a regression - Someone more knowledgeable than me review the changes to the dataufsion-proto traits.
I left a bunch of other comments/questions which I think are not required for this PR to merge but maybe is worth considering
Protobuf changes
I am not sure about the changes to the protobuf serialization / registries / etc (e.g. to take TaskContext rather than FunctionRegistry); I think @timsaucer and @milenkovicm are more clued in than I am in this area
Perhaps you could break those changes (to protobuf serialization traits) into a separate PR so it is easier for them to review / evaluate the scope of the changes
Suggested breakout
Also, breaking out the new .slt tests would help me evaluate the change introduced by this PR (see comments)
| fetch-depth: 0 | ||
| - name: Check size of new Git objects | ||
| env: | ||
| # 1 MB ought to be enough for anybody. |
There was a problem hiding this comment.
do we really need to up the limit? this repo gets checked out a lot
What is so large that required increasing to 2MB?
| match execution_props.subquery_indexes.get(sq) { | ||
| Some(&index) => { | ||
| let schema = sq.subquery.schema(); | ||
| let dt = schema.field(0).data_type().clone(); |
There was a problem hiding this comment.
is it worth checking here that the schema actually hs one field? Maybe something like
assert_or_internal_error(schema.len(),1, "Subquery output expected to be a single field");|
|
||
| /// Shared results container for uncorrelated scalar subqueries. | ||
| /// | ||
| /// Each entry corresponds to one scalar subquery, identified by its index. |
There was a problem hiding this comment.
Since this is part of the public API I recommend wrapping this in its own Struct so we can evolve it more easily wihtout breaking API changes
Something like
struct ScalarSubqueryResults {
// details
}
impl ScalarSubqueryResults {
fn new(n: usize) -> Self {
...
}
}| pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>, | ||
| /// Maps each logical `Subquery` to its index in `subquery_results`. | ||
| /// Populated by the physical planner before calling `create_physical_expr`. | ||
| pub subquery_indexes: HashMap<crate::logical_plan::Subquery, usize>, |
There was a problem hiding this comment.
Codex points out that two logically equivalent subqueries (aka had the same SQL text) will actually be treated as being different beacuse their spans are different
I think this is ok, and we could potentially detect and optimize away duplcated scalar subqueries as a follow on PR (we would also have to detect volatile (random) functions etc)
| bytes: &[u8], | ||
| registry: &dyn FunctionRegistry, | ||
| ) -> Result<Self>; | ||
| fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>; |
There was a problem hiding this comment.
I think this is technically a breaking API change -- maybe we can leave the old method in there and mark it deprecated? Otherwise we should add a note to the upgrade guide
| use futures::TryStreamExt; | ||
|
|
||
| /// Links a scalar subquery's execution plan to its index in the shared results | ||
| /// container. The [`ScalarSubqueryExec`] that owns these links populates |
There was a problem hiding this comment.
maybe we could link explicitly here to what the "shared results container" is (the TaskContext)?
| /// The physical plan for the subquery. | ||
| pub plan: Arc<dyn ExecutionPlan>, | ||
| /// Index into the shared results container. | ||
| pub index: usize, |
There was a problem hiding this comment.
We could potentially tke the rust approach here and use a type wrapper like
pub struct SubqueryId(usize)Mostly so any code that computes them needs to use that type explicitly
| /// All subqueries are evaluated eagerly when the first output partition is | ||
| /// requested, before any rows from the main input are produced. | ||
| /// | ||
| /// TODO: Consider overlapping computation of the subqueries with evaluating the |
There was a problem hiding this comment.
yeah -- a good follow on perhaps
| use futures::StreamExt; | ||
| use futures::TryStreamExt; | ||
|
|
||
| /// Links a scalar subquery's execution plan to its index in the shared results |
There was a problem hiding this comment.
It might also point out that from a results perspective this is a "NoOp" (it just passes the inputs through), but that it has a sideeffect of calculating scalar value subqueries
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_single_row_subquery() -> Result<()> { |
There was a problem hiding this comment.
A lot of these tests seems to be boilerplate / the same
Refactoring the common code into helper functions could potentially make this easier to see what is important for testing rtaher than common setup code
Sorry for the massive review (though I feel somewhat justified b/c the PR was large 😆 ) |
|
@alamb AMAZING!!! Thank you for the thorough review, I really appreciate it. I'll take a look at the comments and respond shortly. |
we should have replaced |
Which issue does this PR close?
array_has#18181.Rationale for this change
Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has two shortcomings:
This PR introduces physical execution of uncorrelated scalar subqueries:
ScalarSubqueryExecplan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with aScalarSubqueryExpr.ScalarSubqueryExecmanages the execution of the subqueries and stores the result in a shared "results container", which is anArc<Vec<OnceLock<ScalarValue>>>. Subquery evaluation is done in parallel (for a given query level), but at present it is not overlapped with evaluation of the parent query.ScalarSubqueryExpris evaluated, it fetches the result of the subquery from the result container.This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.
What changes are included in this PR?
ScalarSubqueryExprPhysicalProtoConverterExtensionto wire upScalarSubqueryExprcorrectlyAre these changes tested?
Yes.
Are there any user-facing changes?
At the SQL-level, scalar subqueries that returned > 1 row will now be rejected instead of producing incorrect query results.
At the API-level, this PR adds several new public APIs (e.g.,
ScalarSubqueryExpr,ScalarSubqueryExec) and makes breaking changes to several public APIs (e.g.,parse_expr). It also introduces a new physical plan node (and allowsSubqueryto remain in logical plans); third-party query optimization code will encounter these nodes when they wouldn't have before.